package com.xiaomaoguai.fcp.pre.kepler.router.async;

import com.lmax.disruptor.WorkHandler;
import com.xiaomaoguai.fcp.pre.kepler.router.buf.Buf;
import com.xiaomaoguai.fcp.pre.kepler.router.handler.api.HandlerContext;
import com.xiaomaoguai.fcp.pre.kepler.router.handler.enums.Instruction;
import com.xiaomaoguai.fcp.pre.kepler.router.router.RouterPipeline;
import lombok.extern.slf4j.Slf4j;

/**
 * 继承AsyncWorkPool的RouterWorlPool event里面的数据为pipe 异步任务为保证线程安全event的data只能是pipe
 *
 * @author DH
 */
@Slf4j
public class RouterWorkPool extends DisruptorWorkPool<HandlerContext<?>> {

	private final static HandlerEventFactory<HandlerContext<?>> eventFactory = new HandlerEventFactory<>();

	private final static WorkHandler<HandlerEvent<HandlerContext<?>>> workHandler = new WorkHandler<HandlerEvent<HandlerContext<?>>>() {

		@Override
		public void onEvent(HandlerEvent<HandlerContext<?>> event) throws Exception {
			HandlerContext<?> hc = event.getData();
			Buf buf = hc.getBuf();
			String bufId = buf.getId();
			log.info("bufId:{} {} 开始异步执行", bufId, hc.getRouter().getRouterName());
			// 除callback以外都是pipe
			// SinglePipe是为了保证buf线程安全的产物
			// 异步调用时不能直接调用invokeHandler,防止再次创建异步任务,直接调用syncHandler
			if (hc instanceof RouterPipeline) {
				RouterPipeline pipe = (RouterPipeline) hc;
				if (pipe.isSinglePipe()) {
					hc = pipe.getNext();
				} else {
					pipe.invokeHandler();
					return;
				}
			}
			// callback时需执行当前handler后，继续执行后续handler
			Instruction instruction = null;
			try {
				instruction = hc.syncHandler();
				if (instruction == Instruction.BREAK) {
					return;
				}
			} catch (Throwable cause) {
				hc.exceptionHandler(cause);
				return;
			}
			if (instruction == Instruction.EXIT) {
				hc.getLast().invokeHandler();
				return;
			}
			hc.getNext().invokeHandler();
		}
	};

	public RouterWorkPool() {
		super(workHandler, eventFactory);
	}


	public RouterWorkPool(AsyncInfo asyncInfo) {
		super(asyncInfo.getAsyncWorkPool(), asyncInfo.getBufferSize(), asyncInfo.getWorkThreadNum(), null, workHandler, null, eventFactory);
	}

	public RouterWorkPool(String asyncWorkPool, Integer bufferSize, Integer workThreadNum) {
		super(asyncWorkPool, bufferSize, workThreadNum, null, workHandler, null, eventFactory);
	}

	public RouterWorkPool(String asyncWorkPool, Integer bufferSize, Integer workThreadNum, Integer maxQueueSize) {
		super(asyncWorkPool, bufferSize, workThreadNum, maxQueueSize, workHandler, null, eventFactory);
	}

	public RouterWorkPool(Integer workThreadNum) {
		super(workThreadNum, workHandler, eventFactory);
	}

}
